﻿using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Lails.MQClient
{
    public class RabbitMQClient : MQClient
    {
        ConnectionFactory _factory;
        IModel _channel;
        IConnection _conn;
        Dictionary<string, bool> _queues = new Dictionary<string, bool>();
        string _exchange;

        public RabbitMQClient(string address, string username, string password, string clientID, string appid = "/") 
            : base(address, username, password, clientID)
        {
            _factory = new ConnectionFactory();
            _factory.UserName = username;
            _factory.Password = password;
            _factory.VirtualHost = appid;
            if (address != null && address.Contains(":"))
            {
                var host = address.Split(':')[0];
                var portStr = address.Split(':')[1];
                int port = 0;
                if (!string.IsNullOrEmpty(portStr)) port = Convert.ToInt32(portStr);

                _factory.HostName = host;
                _factory.Port = port;
            }
            else
            {
                _factory.HostName = address;
            }
        }

        protected override bool _Connect()
        {
            _conn = _factory.CreateConnection();
            _channel = _conn.CreateModel();
            return true;
        }

        protected override bool _DisConnect()
        {
            _channel.Close();
            _conn.Close();
            return true;
        }

        public override bool Publish(string topic, byte[] body, MessageArgs arg = null)
        {
            PreExchange();
            IBasicProperties pro = null;
            if (arg != null)
            {
                pro = _channel.CreateBasicProperties();
                pro.DeliveryMode = (byte)(arg.Durable ? 2 : 1);
            }
            _channel.BasicPublish(_exchange, topic, pro, body);
            return true;
        }

        [Obsolete("该方法重载已过期，请使用：Subscribe(string topic, Func<MessageData, bool> subscribe, MessageArgs arg = null)")]
        public override bool Subscribe(string topic, Func<byte[], bool> subscribe, MessageArgs arg = null)
        {
            PreExchange();
            string queue = $"{topic}_{ClientID}";
            if (!_queues.ContainsKey(queue))
            {
                //参数说明：队列名称，是否持久化，是否专用（私有，仅供此连接），是否自动删除（不使用时是否自动删除，即退订所有），其它参数
                _channel.QueueDeclare(queue, true, false, true, null);
                _channel.QueueBind(queue, _exchange, topic, null);
                _queues.Add(queue, true);
            }

            var consumer = new EventingBasicConsumer(_channel);
            consumer.Received += (ch, ea) =>
            {
                bool succeed = subscribe(ea.Body);
                if (succeed)
                {
                    _channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            //绑定队列
            String consumerTag = _channel.BasicConsume(queue, false, consumer);

            return true;
        }

        public override bool Subscribe(string topic, Func<MessageData, bool> subscribe, MessageArgs arg = null)
        {
            PreExchange();
            string queue = $"{topic}_{ClientID}";
            if (!_queues.ContainsKey(queue))
            {
                //参数说明：队列名称，是否持久化，是否专用（私有，仅供此连接），是否自动删除（不使用时是否自动删除，即退订所有），其它参数
                _channel.QueueDeclare(queue, true, false, true, null);
                _channel.QueueBind(queue, _exchange, topic, null);
                _queues.Add(queue, true);
            }

            var consumer = new EventingBasicConsumer(_channel);
            consumer.Received += (ch, ea) =>
            {
                bool succeed = subscribe(new MessageData() { Topic = ea.RoutingKey, Data = ea.Body });
                if (succeed)
                {
                    _channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            //绑定队列
            String consumerTag = _channel.BasicConsume(queue, false, consumer);

            return true;
        }

        public override bool UnSubscribe(string topic)
        {
            string queue = $"{topic}_{ClientID}";
            lock (_channel)
            {
                _channel.QueueUnbind(queue, _exchange, topic, null);
            }
            return true;
        }

        private void PreExchange()
        {
            if (_exchange == null)
            {
                _exchange = "Lails.Demo.Topic";
                _channel.ExchangeDeclare(_exchange,ExchangeType.Topic, durable: true);
            }
        }

        protected override string FormatTopic(string topic)
        {
            topic = topic.Replace(".*", ". ");
            return topic;
        }
    }
}
